Amundsenにロードしたデータを削除するスクリプトを検証してみた
どーもsutoです。
今回はAmundsenにロードしたデータを削除するスクリプトを検証して、実際にデータ削除を行う手順をご紹介します。
Amundsenとは
概要やアーキテクチャについてはこちらの記事が参考になります。
【参考】データロードの記事
セットアップからデータテーブルのロードまでの記事はこちら
- OSSデータカタログAmundsenにRedshiftメタデータをロードしてみた
- OSSデータカタログAmundsenにGlueメタデータをロードしてみた
- OSSデータカタログAmundsenにOracleのメタデータをロードしてみた
- 色々なDBに使えるOSSデータカタログAmundsenからAmazon Athenaのメタデータを取得してみた
公式のREADMEを見てみる
Amundsenに取り込んだデータを削除するための方法はGitの以下のREADME.md内「Removing stale data in Neo4j」に記述があります。
実行スクリプトとなるのはtaskフォルダにあるneo4j_staleness_removal_task.pyのようです。
削除方法は2種類あります。
-
「published_tag」を使用
- 例では、一言で表すと
2020-03-31
以前に取り込んだデータを削除する処理となります。
- 例では、一言で表すと
task = Neo4jStalenessRemovalTask() job_config_dict = { 'job.identifier': 'remove_stale_data_job', 'task.remove_stale_data.neo4j_endpoint': neo4j_endpoint, 'task.remove_stale_data.neo4j_user': neo4j_user, 'task.remove_stale_data.neo4j_password': neo4j_password, 'task.remove_stale_data.staleness_max_pct': 10, 'task.remove_stale_data.target_nodes': ['Table', 'Column'], 'task.remove_stale_data.job_publish_tag': '2020-03-31' } job_config = ConfigFactory.from_dict(job_config_dict) job = DefaultJob(conf=job_config, task=task) job.launch()
-
「publisher_last_updated_epoch_ms」を使用
- 例では、特定のノードまたはリレーションが過去3日間公開されていないデータを削除するものです。
task = Neo4jStalenessRemovalTask() job_config_dict = { 'job.identifier': 'remove_stale_data_job', 'task.remove_stale_data.neo4j_endpoint': neo4j_endpoint, 'task.remove_stale_data.neo4j_user': neo4j_user, 'task.remove_stale_data.neo4j_password': neo4j_password, 'task.remove_stale_data.staleness_max_pct': 10, 'task.remove_stale_data.target_relations': ['READ', 'READ_BY'], 'task.remove_stale_data.milliseconds_to_expire': 86400000 * 3 } job_config = ConfigFactory.from_dict(job_config_dict) job = DefaultJob(conf=job_config, task=task) job.launch()
またjob_config_dict
内で使用しているパラメータのデフォルト値がneo4j_staleness_removal_task.py
のなかに記述されています。
DEFAULT_CONFIG = ConfigFactory.from_dict({BATCH_SIZE: 100, NEO4J_MAX_CONN_LIFE_TIME_SEC: 50, NEO4J_ENCRYPTED: True, NEO4J_VALIDATE_SSL: False, STALENESS_MAX_PCT: 5, TARGET_NODES: [], TARGET_RELATIONS: [], STALENESS_PCT_MAX_DICT: {}, MIN_MS_TO_EXPIRE: 86400000, DRY_RUN: False})
このなかでも注目するパラメータがSTALENESS_MAX_PCT: 5
で、これは「削除対象となるデータがAmundsenに取り込んでいる全データの5%を超えていた場合、削除が実行されずタスクが中止される」ことを意味します。
つまり開発中や運用中の誤動作や設定ミスなどによるデータ損失リスクを回避するためのセーフティ機能の役割を果たしています。削除する場合はこの値を調整しながら実行していく必要があります。
また、公式にも書いていますがデータ削除の実行はリスクのある行為なので、とくに本番運用中の場合はDRY_RUN: True
にしておくことを推奨しているようですね。
実際にやってみた
今回は、以前の記事で検証としてRedshiftやGlueからデータを取り込んでいたので、そのクリーンアップを兼ねて全データの一括削除を行ってみました。
Amundsenのコンテナ起動済みで、RedshiftやGlueのデータがロードされている状態からスタートします。
まずはロード処理の際モジュールをインストール等を行った仮想環境に入り、neo4j_staleness_removal_task.py
を開きます。
cd amundsen/databuilder source venv/bin/activate vi databuilder/task/neo4j_staleness_removal_task.py
以下のようにjob_config_dict
に使用する変数(neo4j_endpointなど)の定義を追記、メイン関数にデータ削除ジョブを実行するコードを追記します。
import logging import textwrap # ~~省略~~ MARKER_VAR_NAME = 'marker' # 変数定義でneo4jプロキシの接続情報を格納 neo_host = None NEO4J_ENDPOINT = f'bolt://{neo_host or "localhost"}:7687' neo4j_endpoint = NEO4J_ENDPOINT neo4j_user = 'neo4j' neo4j_password = 'test' class Neo4jStalenessRemovalTask(Task): # ~~省略~~ finally: LOGGER.debug('Cypher query execution elapsed for %i seconds', time.time() - start) # データ削除ジョブの呼び出しを追記 if __name__ == "__main__": task = Neo4jStalenessRemovalTask() job_config_dict = { 'job.identifier': 'remove_stale_data_job', 'task.remove_stale_data.neo4j_endpoint': neo4j_endpoint, 'task.remove_stale_data.neo4j_user': neo4j_user, 'task.remove_stale_data.neo4j_password': neo4j_password, 'task.remove_stale_data.staleness_max_pct': 101, 'task.remove_stale_data.target_nodes': ['Table', 'Column'], 'task.remove_stale_data.job_publish_tag': '2021-09-13' } job_config = ConfigFactory.from_dict(job_config_dict) job = DefaultJob(conf=job_config, task=task) job.launch()
今回のneo4jプロキシのパラメータはデフォルト値です。また、job_publish_tag
にはデータをロードしたのは2021-09-12以前の日付でしたので、コードには2021-09-13にしています。
ここで全データを削除するためstaleness_max_pct
を101%にしています。(理由は後述)
編集内容を保存したらスクリプトを実行してみます。
python3 databuilder/task/neo4j_staleness_removal_task.py
以下画面のようにロードしてしたデータが削除されました。
検索結果がコンテナサーバのキャッシュ上に残っている可能性がありますので、表示がおかしい場合はキャッシュクリアやDockerの再起動などを試してみてください。
ちょっとハマったところ
staleness_max_pctですが、ここは100%で良いのでは?と思って実行してみたのですが、以下のように上限によるエラーとなってしまいました。
(venv) [ec2-user@ip-10-0-0-198 databuilder]$ python3 databuilder/task/neo4j_staleness_removal_task.py Traceback (most recent call last): File "databuilder/task/neo4j_staleness_removal_task.py", line 296, in <module> job.launch() File "/home/ec2-user/amundsen/databuilder/venv/lib64/python3.7/site-packages/amundsen_databuilder-6.0.3-py3.7.egg/databuilder/job/job.py", line 76,in launch File "/home/ec2-user/amundsen/databuilder/venv/lib64/python3.7/site-packages/amundsen_databuilder-6.0.3-py3.7.egg/databuilder/job/job.py", line 66,in launch File "databuilder/task/neo4j_staleness_removal_task.py", line 121, in run self.validate() File "databuilder/task/neo4j_staleness_removal_task.py", line 131, in validate self._validate_node_staleness_pct() File "databuilder/task/neo4j_staleness_removal_task.py", line 241, in _validate_node_staleness_pct types=self.target_nodes) File "databuilder/task/neo4j_staleness_removal_task.py", line 217, in _validate_staleness_pct raise Exception(f'Staleness percentage of {type_str} is {stale_pct} %. ' Exception: Staleness percentage of Table is 100.0 %. Stopping due to over threshold 100 %
100%(全データ削除)のため安全装置としてタスクが自動停止したんですね。
開発側の仕様としてそのようになっているので、強制的に全データを削除したい場合は100%より高い数値を設定してしまえばタスク停止を回避して全データを削除することができました。(percentの定義としてなんだか裏技にようになってしまいましたが。。。)